该部分主要描述CEDA各个组件的安装配置使用以及代码示例等内容
第一步:CEDA产品环境配置
第二步:导入Sample代码
第三步:初识CEDA Server/Client API
第四步:初识消息中间件AMQ
第五步:初始注册服务器
第六步:初识安全通讯服务器ACS
第七步:初识客户端框架
ATF客户端系统要求:Windows XP/Windows 7, .net framework 2.0
sp2/.net framework 3.5sp1
服务器端系统要求:Linux(CentOS5.4 或 RHEL5.4), Java 6
Java:JDK 6, Eclipse 3.2
C++:VisualStudio 2008
C#:VisualStudio 2008
1.从Eclipse导入工程
2.选择next
3.选择sample代码所在的文件夹
4.点击finish,完成代码导入
安装了Visual Studio 2008后直接点击例子里面的*.csproj文件, 就可以打开sample代码.
API集成在CEDA的安装包中,下载和安装请见CEDA安装包的下载页面。
publicclass P2PPublisher implements IServerConnectionListener
// 设置server参数
ServiceInfo serviceInfo = new ServiceInfo();
serviceInfo.setHost(host);
serviceInfo.setPort(port);
// 启动server
serverHandler = ServiceManager.getInstance().startServer(
serviceInfo, this);
class MessageHandler implements IConnectListener
publicboolean userValidation(UserInfo arg0, IServerConnectionconnHandler)
{
MessageHandler msgHandler = new MessageHandler();
connHandler.setListener(msgHandler);
returntrue;
}
publicboolean userValidation(ClientInfo arg0, IServerConnectionconnHandler)
{
MessageHandler msgHandler = new MessageHandler();
connHandler.setListener(msgHandler);
returntrue;
}
1 IEventListener: 监听与server的连接信息
2 IMessageListener: 从server接收消息
publicclass P2PSubscriber implements IMessageListener, IEventListener
info.setUser("test", "test");ClientInfo info = new ClientInfo();
info.setUser("test", "test");
// 设置服务端的IP地址和端口号
info.setAddress("serverHost", "serverPort");
// 创建连接
IClientConnection conn = ClientConnectionFactory
.createConnection(info);
conn.addEventListener(this);
conn.start();
// 订阅topic
IClientSession session = conn.createSession();
consumer = session.createConsumer(new Destination(topic));
consumer.addMessageListener(this);
publicvoid onSubscribe(List
IServerConnection connHandler) {
serverHandler.subscribe(topicList, connHandler);
}
publicvoid onUnSubscribe(List topicList,
IServerConnection connHandler)
{
serverHandler.unSubscribe(topicList, connHandler);
}
// 构建消息
Message msg = new Message();
msg.setDestination(new Destination(topic));
msg.getMessageBody().addString((short) 1, content);
msg.getMessageBody().addBytes((short) 2, buf);
// 发送消息
serverHandler.sendMessage(msg);
// 订阅topic
IClientSession session = conn.createSession();
MessageConsumer consumer = session.createConsumer(new
Destination(
topic));
consumer.addMessageListener(this);
// 发送request
MessageSender sender = session.createProducer();
requestCount++;
// 构造request消息
Message msgRequest = new Message();
msgRequest.setDestination(new Destination(topic));
msgRequest.setMessageID(requestCount);
msgRequest.setMessageBody(new MessageBody());
// 设置request参数
msgRequest.getMessageBody().addInt((short) 1, 1);
msgRequest.getMessageBody().addInt((short) 2, 2);
// 发送request消息
sender.send(msgRequest);
publicvoid onMessage(Message message) {
if (message != null) {
try {
System.out.printf("Receive a message : id=%d,
result=%d",message.getMessageID(),
message.getMessageBody().getInt((short) 1));
} catch (MessageBodyException e) {
e.printStackTrace();
}
}
}
在MessageHandler的onMessage接口里面接收client request的消息, 然后生成reply 消息给client:
// 从request消息生成reply消息
Message msgReply = msgRequest.createReplyMessage();
// 获取请求参数
int arg1 = msgRequest.getMessageBody().getInt((short) 1);
int arg2 = msgRequest.getMessageBody().getInt((short) 2);
int result = arg1 + arg2;
// 设置结果
msgReply.getMessageBody().addInt((short) 1, result);
// 返回reply消息
connHandler.sendMessage(msgReply);
client订阅消息 ats/example/java/P2PSubscriber.java
server发布消息 ats/example/java/P2PPublisher.java
client request ats/example/java/P2PRequestClient.java
server reply ats/example/java/P2PReplyServer.java
下载和安装最新版本的AMQ,请见下载页面。
ClientInfo info = new ClientInfo();
info.setUser("test", "test");
// 设置AMQ的IP地址和端口号
info.setAddress(host, port);
// 创建连接
IClientConnection conn = ClientConnectionFactory
.createConnection(info);
conn.addEventListener(this);
conn.start();
// 订阅topic,接收消息
IClientSession session = conn.createSession();
MessageConsumer consumer = session.createConsumer(new Destination(topic));
consumer.addMessageListener(this);
publicvoid onMessage(Message message)
{
if (message != null)
{
try {
System.out.println("Receive a message : "
+ message.getMessageBody().getString((short) 1));
}
catch (MessageBodyException e) {
e.printStackTrace();
}
}
}
// 创建Session和Sender
session = conn.createSession();
sender = session.createProducer();
// 构建消息
Message msg = new Message();
msg.setDestination(new Destination(topic));
msg.setMessageBody(new MessageBody());
msg.getMessageBody().addString((short) 1, content);
msg.getMessageBody().addBytes((short) 2, buf);
// 发送消息
sender.send(msg);
订阅消息 ats/example/java/MQPublisher.java
发布消息 ats/example/java/MQSubscriber.java
注册服务器集成在CEDA的安装包中,下载和安装请见CEDA安装包的下载页面。
为了连接到注册服务器, 需要实现以下2个接口:
1.IEventListener: 应用和注册服务器的连接状态信息
2.IClusterListener: 注册应用变动信息
serviceInfo = new ServiceInfo();
serviceInfo.setHost("127.0.0.1");
// 使用register server balance模式
serviceInfo.setType(ServiceInfo.SERVICE_TYPE_STANDBY);
serviceInfo.setName(this.getClass().getSimpleName());
// 监听连接到register server的事件
ServiceManager.getInstance().addRegisterEventListener(this);
serviceInfo.setType(ServiceInfo.SERVICE_TYPE_STANDBY);
serviceInfo.setType(ServiceInfo.SERVICE_TYPE_BLANCE);
// 创建集群
registerClient = ServiceManager.getInstance().createClustClient(
serviceInfo.getName());
// 监听集群事件
registerClient.addListener(this);
ServiceManager.getInstance().connectRegister(host + ":" + port);
publicvoid onEvent(int nCode) {
switch (nCode) {
case IEventListener.CONNECTION_CONNECTED:
System.out.println("Service: register CONNECTION_CONNECTED");
// 连接后注册服务
registerService();
break;
case IEventListener.CONNECTION_CLOSED:
System.out.println("Service: register CONNECTION_CLOSED");
break;
}
}
registerClient.registService(serviceInfo);
注册服务后, 从IClusterListener的onClustChange里监听所有服务的变动情况(新注册服务, 已注册的服务退出等) 服务变动时, 比较应用自己id与work service的id, 如果相同, 则启动服务:
// 获取work service列表
List infoList = registerClient.getWorkService();
for (ServiceInfo sinfo : infoList) {
// 如果是主server,则启动服务
if (sinfo.getSequenceName().equals(serviceInfo.getSequenceName())) {
// 启动服务
if (!isStart) {
System.out.println("start server: "
+ sinfo.getSequenceName());
isStart = true;
}
}
}
balance模式在初始化后, 先启动服务, 然后再连接注册服务器注册, 不再需要监听onClustChange事件来启动服务.
完整示例代码在目录/example/java/03 RegistryServer/下:
standby服务器端 ats/example/java/StandbyServer.java
balance服务器端 ats/example/java/BalanceServer.java
standby客户端 ats/example/java/StandbyClient.java
balance客户端 ats/example/java/BalanceClient.java
安全通讯服务器集成在CEDA的安装包中,下载和安装请见CEDA安装包的下载页面。
ACS可以作为消息中间件, 类似于AMQ功能. 消息订阅者和发布者需要以http方式连接上:
ClientInfo info = new ClientInfo();
info.setUser("test", "test");
// 设置ACS的url和端口号
info.setAddress(host, port);
// 设置连接协议,ACS使用http/https
info.setProtocol(ClientInfo.PROTOCOL_HTTP);
ACS的host是一个url.
后台服务注册到registry server, 配置ACS连接到相同的registry server, 并且该服务配置到ACS的服务列表里面:
[Register]
#registry server地址,多个地址用“,”分隔
Address=192.168.1.111:2182
#需要连接的Service
RequestService=ACSReplyServer,ACSPublishServer
// 创建连接
conn = ClientConnectionFactory.createConnection(info);
// 设置要连接的服务名称
conn.setMQServer(ACSReplyServer.class.getSimpleName());
conn.addEventListener(this);
conn.start();
以后该连接调用的服务就是后台对应的服务.
作为消息中间件的订阅/发布例子 : ats/example/java/ACSPublisher.java
ats/example/java/ACSSubscriber.java
作为中转服务器的request/reply例子: ats/example/java/ACSReplyServer.java
ats/example/java/ACSRequestClient.java
作为中转服务器的订阅/发布例子 : ats/example/java/ACSPublishServer.java
ats/example/java/ACSSubscribeClient.java
客户端框架集成在CEDA的安装包中,下载请见CEDA安装包的下载页面, 客户端框架中CEDA安装包根目录下/atf文件夹下.
需要根据服务器配置修改/atf/control/SampleLocalEnvSetting.xml中的服务器地址:
双击/atf文件夹下的AtfLite.exe就可以打开框架
1 Visual Studio新建一个Windows Forms的class library工程
2 引用必需的dll(可知/atf/bin下找到):
Atf.Plugin.dllDevExpress.Data.v12.1.dllDevExpress.Utils.v12.1.dllDevExpress.XtraEditors
.v12.1.dll
3 新建一个UserControl, 把继承的类由UserControl改成Atf.Plugin.Imp.IPluginDevImpl
1.布局界面, 界面逻辑编码等
2 在/atf/control/AppModules.xml里面增加一个plugin
3.在/atf/control/AppModules.xml里面增加一个module
4 运行框架, 就可以看到插件在左边侧栏里面的模块下面
5.双击打开插件
这是一个初始化使用request/reply从后台获取数据, 后续通过订阅/发布消息实现更新的例子. 后台服务代码见目录/example/java/ 05MarketDataServer/.
1.布局界面
2.点击获取价格, 先创建消息处理器MessageHandler,订阅消息:
if (msgHandler == null)
{
msgHandler = MQIIIManager.Instance.RegisterHandler(this.serverName, this.GetType().Name);
msgHandler.MQMessage += newMessageHandler.OnMQMessageDelegate(msgHandler_MQMessage);
}
//订阅主题
msgHandler.SubscribeTopics(newstring[] /p>
{ this.topic });
然后启动线程, 从后台获取初始化数据:
//获取初始化数据,需要在线程里进行
ThreadPool.QueueUserWorkItem(newWaitCallback(RequestInitData));
使用request/reply方式从后台获取初始化数据:
//构建请求消息
com.adaptiveMQ2.message.Message msg = new com.adaptiveMQ2.message.Message();
msg.Destination = new com.adaptiveMQ2.message.BaseDestination(this.topic);
msg.MessageBody = new com.adaptiveMQ2.message.MessageBody();
msg.SvrID = this.serverName;
//请求数据
com.adaptiveMQ2.message.Message replyMsg = MQIIIManager.Instance.RequestMessage(this.serverName, msg);
//处理返回的数据
OnData(replyMsg);
3.订阅的实时消息到来时的处理:
沪公网安备 31011502002921号 技术支持 - 上海子午线新荣科技有限公司 | 产品授权